· 6 years ago · Apr 15, 2019, 09:10 AM
1/*
2 * To change this license header, choose License Headers in Project Properties.
3 * To change this template file, choose Tools | Templates
4 * and open the template in the editor.
5 */
6package cz.eago.efence.services.templates.kaf_cass;
7
8import com.datastax.driver.core.Cluster;
9import com.datastax.driver.core.ResultSet;
10import com.datastax.driver.core.ResultSetFuture;
11import com.datastax.driver.core.Session;
12import com.datastax.driver.core.exceptions.DriverException;
13import com.datastax.driver.core.utils.UUIDs;
14import com.google.common.util.concurrent.FutureCallback;
15import com.google.common.util.concurrent.Futures;
16import cz.eago.jeecfg.annotations.Log;
17import java.util.Map;
18import java.util.logging.Level;
19import java.util.logging.Logger;
20import java.util.stream.Collectors;
21
22/**
23 *
24 */
25public class CassandraDb {
26
27 private Cluster cluster;
28 private Session session;
29
30 private final String serverIP;
31 private final String keyspace;
32
33 @Log
34 private Logger log;
35 private final boolean createKeyspace;
36
37 public static void main(String[] args) {
38 CassandraDb cassandraDb = new CassandraDb("localhost", "ttt", true);
39 cassandraDb.connect();
40 cassandraDb.disconnect();
41 }
42
43 public CassandraDb(final String serverIP, final String keyspace, final boolean createKeyspace) {
44 this.serverIP = serverIP;
45 this.keyspace = keyspace;
46 this.createKeyspace = createKeyspace;
47 }
48
49 public void connect() throws DriverException {
50 log.info("CassandraDbBean started");
51 cluster = Cluster.builder().addContactPoints(serverIP).build();
52 if (createKeyspace) {
53 final Session noKeyspaceSession = cluster.connect();
54 createKeyspace(noKeyspaceSession, keyspace);
55 noKeyspaceSession.closeAsync();
56 }
57 session = cluster.connect(keyspace);
58 }
59
60 public void disconnect() {
61 if (session != null) {
62 try {
63 session.close();
64 } catch (Exception ex) {
65 log.log(Level.WARNING, "problem in close session", ex);
66 }
67 session = null;
68 }
69 if (cluster != null) {
70 try {
71 cluster.close();
72 } catch (Exception ex) {
73 log.log(Level.WARNING, "problem in close cluster", ex);
74 }
75 cluster = null;
76 }
77 log.info("disconnected");
78 }
79
80 /**
81 * keyspace is created synchronously and session must exist
82 *
83 */
84 public static void createKeyspace(final Session kafkaSession, final String keyspaceName) throws DriverException {
85 StringBuilder sb = new StringBuilder("CREATE KEYSPACE IF NOT EXISTS ")
86 .append(keyspaceName)
87 .append(" WITH replication = {")
88 .append("'class':'")
89 .append("SimpleStrategy")
90 .append("','replication_factor':")
91 .append(1)
92 .append("};");
93 String query = sb.toString();
94 kafkaSession.execute(query);
95 }
96
97 /**
98 * table is created synchronously and session must exist
99 *
100 * @param tableName
101 * @param columnDefinitions for example "id uuid" or "json text" and so on
102 */
103 public void createTable(final String tableName, String... columnDefinitions) throws DriverException {
104 StringBuilder query = new StringBuilder("CREATE TABLE IF NOT EXISTS ")
105 .append(tableName).append("(");
106
107 for (int i = 0; i < columnDefinitions.length; i++) {
108 if (i > 0) {
109 query.append(", ");
110 }
111 query.append(columnDefinitions[i]);
112 }
113 query.append(");");
114 log.log(Level.INFO, "will create table " + tableName);
115 session.execute(query.toString());
116 }
117
118 public String getServerIP() {
119 return serverIP;
120 }
121
122 public String getKeyspace() {
123 return keyspace;
124 }
125
126 public Session getSession() {
127 return session;
128 }
129
130 public ResultSet execute(String query) {
131 return session.execute(query);
132 }
133
134 public ResultSet execute(String query, Map<String, Object> values) {
135 return session.execute(query, values);
136 }
137
138 public ResultSetFuture executeAsync(String query) {
139 return session.executeAsync(query);
140 }
141
142 public ResultSetFuture executeAsync(String query, Map<String, Object> values) {
143 return session.executeAsync(query, values);
144 }
145
146 public ResultSetFuture insert(final String tableName, final Map<String, Object> valueMap) {
147 final String cols = valueMap.keySet().stream().collect(Collectors.joining(", "));
148 String query = "BEGIN BATCH INSERT INTO " + tableName + " (" + cols
149 + ") VALUES (:" + cols.replace(", ", ", :") + ");"
150 + "APPLY BATCH;";
151
152 return session.executeAsync(query, valueMap);
153 }
154
155 public void createRecord(String json, String tablename) {
156 String sb = "BEGIN BATCH INSERT INTO "
157 + tablename + " (id, json) "
158 + "VALUES (" + UUIDs.timeBased() + ", '" + json + "');"
159 + "APPLY BATCH;";
160 ResultSetFuture rsf = session.executeAsync(sb);
161 Futures.addCallback(rsf, new FutureCallback<ResultSet>() {
162 @Override
163 public void onSuccess(ResultSet result) {
164 session.close();
165 }
166
167 @Override
168 public void onFailure(Throwable t) {
169 session.close();
170 }
171 });
172 }
173}